package redis

import (
	"context"
	"fmt"
	"gitee.com/aesoper/cache/factory"
	"github.com/go-redis/redis/v8"
	"sync"
	"time"
)

type (
	ClusterCache struct {
		client *redis.ClusterClient
		cfg    ClusterOptions
		waiter sync.WaitGroup
		mux    sync.RWMutex
	}

	ClusterOptions struct {
		Address    []string `mapstructure:"address" json:"address"`
		Password   string   `mapstructure:"password" json:"password"`
		MaxRetries int      `mapstructure:"maxRetries" json:"maxRetries"`
		PoolSize   int      `mapstructure:"poolSize" json:"poolSize"`
		Prefix     string   `mapstructure:"prefix" json:"prefix"`
	}
)

func init() {
	factory.Register("redis_cluster", NewRedisClusterCache)
}

func NewRedisClusterCache() factory.Cache {
	return &ClusterCache{}
}

func WithRedisClusterOptions(cfg ClusterOptions) factory.CacheOption {
	return func(c factory.Cache) {
		if cache, ok := c.(*ClusterCache); ok {
			cache.cfg = cfg
		}
	}
}

func (c *ClusterCache) StartAndConfigure(ops ...factory.CacheOption) error {
	c.cfg = ClusterOptions{
		Address:    []string{"localhost:6379"},
		Password:   "",
		MaxRetries: 0,
		PoolSize:   0,
		Prefix:     "redis",
	}
	for _, op := range ops {
		op(c)
	}

	return c.init()
}

func (c *ClusterCache) Get(key string) ([]byte, error) {
	if err := c.init(); err != nil {
		return nil, err
	}
	c.mux.RLock()
	defer c.mux.RUnlock()

	return c.client.Get(context.Background(), c.associate(key)).Bytes()
}

func (c *ClusterCache) Put(key string, val interface{}, timeout time.Duration) error {
	if err := c.init(); err != nil {
		return err
	}
	c.mux.Lock()
	defer c.mux.Unlock()
	return c.client.Set(context.Background(), c.associate(key), val, timeout).Err()
}

func (c *ClusterCache) Delete(key string) error {
	if err := c.init(); err != nil {
		return err
	}
	c.mux.Lock()
	defer c.mux.Unlock()
	return c.client.Del(context.Background(), c.associate(key)).Err()
}

func (c *ClusterCache) Incr(key string) error {
	if err := c.init(); err != nil {
		return err
	}
	c.mux.Lock()
	defer c.mux.Unlock()
	return c.client.Incr(context.Background(), c.associate(key)).Err()
}

func (c *ClusterCache) Decr(key string) error {
	if err := c.init(); err != nil {
		return err
	}
	c.mux.Lock()
	defer c.mux.Unlock()
	return c.client.Decr(context.Background(), c.associate(key)).Err()
}

func (c *ClusterCache) IsExist(key string) bool {
	if err := c.init(); err != nil {
		return false
	}
	c.mux.RLock()
	defer c.mux.RUnlock()
	err := c.client.Exists(context.Background(), c.associate(key)).Err()
	return err == nil
}

func (c *ClusterCache) ClearAll() error {
	if err := c.init(); err != nil {
		return err
	}
	c.mux.Lock()
	defer c.mux.Unlock()
	keys, err := c.client.Keys(context.Background(), c.cfg.Prefix+"*").Result()
	if err != nil {
		return err
	}

	if len(keys) > 0 {
		c.waiter.Add(len(keys))
		for _, key := range keys {
			go func(k string) {
				defer c.waiter.Done()
				_ = c.Delete(k)
			}(key)
		}
		c.waiter.Wait()
	}

	return nil
}

func (c *ClusterCache) Close() error {
	if c.client != nil {
		return c.client.Close()
	}
	return nil
}

func (c *ClusterCache) associate(originKey interface{}) string {
	return fmt.Sprintf("%s:%s", c.cfg.Prefix, originKey)
}

func (c *ClusterCache) init() error {
	if c.client == nil {
		c.client = redis.NewClusterClient(&redis.ClusterOptions{
			Addrs:      c.cfg.Address,
			Password:   c.cfg.Password,
			MaxRetries: c.cfg.MaxRetries,
			PoolSize:   c.cfg.PoolSize,
			TLSConfig:  nil,
		})

		return c.client.Ping(context.Background()).Err()
	}

	return nil
}
